package com.atguigu.app.dim;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.DimCreateTableMapFunction;
import com.atguigu.app.func.DimSinkFunction;
import com.atguigu.app.func.DimTableProcessFunction;
import com.atguigu.bean.TableProcess;
import com.atguigu.common.Constant;
import com.atguigu.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

//数据流：web/app -> Nginx -> 业务服务器(Mysql) -> Maxwell -> Kafka(ODS) -> FlinkApp -> HBase(DIM)
//程  序：Mock -> maxwell.sh -> Kafka(ZK) -> DimApp -> HBase(HDFS ZK)
public class DimApp {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);  //生产环境中,主题并行度与Kafka主题的分区数保持一致

        //1.1 开启CheckPoint
        //env.enableCheckpointing(60000 * 5);
        //env.setStateBackend(new HashMapStateBackend());

        //1.2 CheckPoint相关设置
        //CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        //checkpointConfig.setCheckpointTimeout(10000L);
        //checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
        //Cancel任务时保存最后一次CheckPoint结果
        //checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
        //设置重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000L));

        //TODO 2.读取Kafka topic_db主题数据创建数据流
        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(Constant.TOPIC_ODS_DB, "dim_app_230524");
        DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        //TODO 3.将数据流每行数据转换为JSON对象 过滤数据
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                if (value != null) {
                    try {
                        JSONObject jsonObject = JSON.parseObject(value);
                        out.collect(jsonObject);
                    } catch (JSONException e) {
                        System.out.println("脏数据：" + value);
                    }
                }
            }
        });

        //TODO 4.使用FlinkCDC读取配置信息表数据
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(Constant.MYSQL_HOST)
                .port(Constant.MYSQL_PORT)
                .username("root")
                .password("000000")
                .databaseList("gmall-230524-config")
                .tableList("gmall-230524-config.table_process")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source");

        //TODO 5.将配置信息流做成广播流并与数据流进行连接
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("bc-state", String.class, TableProcess.class);
        BroadcastStream<TableProcess> broadcastDS = mysqlDS
                .map(new DimCreateTableMapFunction())
                .broadcast(mapStateDescriptor);
        BroadcastConnectedStream<JSONObject, TableProcess> connectDS = jsonObjDS.connect(broadcastDS);

        //TODO 6.处理连接流  根据配置信息过滤数据流
        SingleOutputStreamOperator<JSONObject> hbaseDS = connectDS.process(new DimTableProcessFunction(mapStateDescriptor));
        hbaseDS.print("hbaseDS>>>>>");

        //TODO 7.将过滤后的数据写出到HBase
        hbaseDS.addSink(new DimSinkFunction());

        //TODO 8.启动任务
        env.execute("DimApp");

    }

}
